package com.antball.v1;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @Auther: huangsj
 * @Date: 2018/8/23 10:48
 * @Description:
 */
public class ConsumerDemo {

    public static void main(String[] args) throws IOException {

        Properties properties = new Properties();
        InputStream in = ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties");
        properties.load(in);

        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        Collection<String> topics = Arrays.asList("hadoop");
        // 消费者订阅topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;

//        //用于分配topic和partition
//        consumer.assign();
//
//        consumer.seek(,);
//        //不改变当前offset，指定从这个topic和partition的开始位置获取。
//        consumer.seekToBeginning(,);
//        consumer.seekToEnd();

        while (true) {
            // 接下来就要从topic中拉取数据
            consumerRecords = consumer.poll(1000);
            // 遍历每一条记录
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                int partition = consumerRecord.partition();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
            }

        }
    }

}
